FastAPI in Depth
The Surprising Part: Your FastAPI App Is an ASGI Application
Most engineers think of FastAPI as a "web framework." That mental model leads to shallow code. Here is what actually happens when a request arrives:
# What you think happens:
# request → FastAPI → your route function → response
# What actually happens:
# request → Uvicorn (ASGI server)
# → Your middleware stack (each is an ASGI app wrapping the next)
# → Starlette's routing layer
# → FastAPI's dependency injection resolver
# → Your route function
# → FastAPI's response serialisation
# → Exception handlers (on the way back up)
# ← Starlette serialises the Response object
# ← Middleware stack processes the response
# ← Uvicorn sends bytes over the socket
# FastAPI is built on Starlette.
# Starlette is an ASGI toolkit.
# ASGI middleware is just callables that wrap other callables.
# This means you can write raw ASGI middleware that runs
# before FastAPI even sees the request.
class RawASGIMiddleware:
def __init__(self, app):
self.app = app # The next ASGI app in the chain
async def __call__(self, scope, receive, send):
# scope: dict with request metadata (type, path, headers, ...)
# receive: async callable to get the next message from the client
# send: async callable to send a message to the client
if scope["type"] == "http":
# We can modify scope, buffer the body, measure timing...
scope["state"]["request_id"] = generate_request_id()
await self.app(scope, receive, send) # Call the next layer
Understanding that FastAPI is a layered ASGI application - not a magic framework - is what separates engineers who debug production issues in minutes from those who spend hours.
What You Will Learn
- Dependency Injection -
Depends(), sub-dependencies,yielddependencies, overriding in tests - Lifespan Events - replacing deprecated startup/shutdown with
@asynccontextmanager - Background Tasks - when to use FastAPI's
BackgroundTasksvs Celery vs asyncio tasks - Middleware - writing ASGI middleware, request ID injection, timing, correlation IDs, the ordering trap
- Exception Handlers - mapping domain exceptions to HTTP responses globally
- OpenAPI Customisation - hiding routes, custom schemas, security schemes, client SDK generation
- Production Patterns - uvicorn + gunicorn config, graceful shutdown, health and readiness probes
Prerequisites: Python async/await, Pydantic v2 basics, basic HTTP knowledge.
Part 1: Dependency Injection Deep Dive
Why Dependency Injection at All?
# WITHOUT dependency injection - hard to test, hard to swap implementations
from fastapi import FastAPI
import psycopg2
app = FastAPI()
@app.get("/documents/{doc_id}")
async def get_document(doc_id: str):
# This is directly coupled to PostgreSQL.
# How do you test this without a real database?
# How do you swap to a different DB? You cannot without touching the route.
conn = psycopg2.connect("postgresql://localhost/docs")
row = conn.execute("SELECT * FROM documents WHERE id = %s", (doc_id,)).fetchone()
return {"id": doc_id, "content": row["content"]}
FastAPI's Depends() system solves this cleanly.
Basic Depends() Pattern
# upload_service/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from typing import AsyncGenerator
DATABASE_URL = "postgresql+asyncpg://admin:secret@localhost:5432/documents"
engine = create_async_engine(DATABASE_URL, pool_size=10, max_overflow=20)
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)
# This is a "yield dependency" - FastAPI calls it, iterates it once to get
# the session, runs your route, then resumes past the yield to handle cleanup.
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# session.__aexit__ closes the connection back to the pool
# A simple value dependency - any callable works
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db_session), # Sub-dependency
) -> dict:
token = credentials.credentials
# Validate JWT, look up user in DB, etc.
user = await verify_token_and_get_user(token, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
return user
# Route uses the dependency - no DB or auth logic in the route itself
from fastapi import APIRouter
router = APIRouter()
@router.get("/documents/{doc_id}")
async def get_document(
doc_id: str,
db: AsyncSession = Depends(get_db_session),
current_user: dict = Depends(get_current_user),
):
doc = await db.get(Document, doc_id)
if not doc or doc.owner_id != current_user["id"]:
raise HTTPException(status_code=404, detail="Document not found")
return doc.to_dict()
Sub-Dependencies and the Dependency Graph
FastAPI resolves dependencies as a DAG (Directed Acyclic Graph). If two route parameters both depend on get_db_session, FastAPI calls get_db_session once and shares the result - within a single request.
# Both of these depend on get_db_session
# FastAPI calls get_db_session ONCE per request, not twice
async def get_user_service(db: AsyncSession = Depends(get_db_session)) -> UserService:
return UserService(db)
async def get_audit_service(db: AsyncSession = Depends(get_db_session)) -> AuditService:
return AuditService(db)
@router.post("/documents/")
async def create_document(
payload: DocumentCreate,
user_svc: UserService = Depends(get_user_service),
audit_svc: AuditService = Depends(get_audit_service),
# get_db_session is called only ONCE even though both services need it
):
...
This is why you should build service-layer classes that take a db session, rather than having routes call the DB directly.
Overriding Dependencies in Tests
The killer feature of FastAPI DI: you can replace any dependency in tests without monkey-patching.
# tests/test_documents.py
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock
from upload_service.main import app
from upload_service.dependencies import get_db_session, get_current_user
# A fake DB session for tests
class FakeSession:
async def get(self, model, pk):
if pk == "doc-123":
return Document(id="doc-123", content="hello", owner_id="user-1")
return None
async def commit(self): pass
async def rollback(self): pass
@pytest.fixture
def fake_db():
async def override():
yield FakeSession()
return override
@pytest.fixture
def fake_user():
async def override():
return override
@pytest.mark.asyncio
async def test_get_document(fake_db, fake_user):
# Override dependencies - no real DB, no real auth
app.dependency_overrides[get_db_session] = fake_db
app.dependency_overrides[get_current_user] = fake_user
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get("/documents/doc-123")
assert response.status_code == 200
assert response.json()["id"] == "doc-123"
# Always clean up overrides so they don't bleed into other tests
app.dependency_overrides.clear()
Class-Based Dependencies
For complex dependencies with state, use a class with __call__:
# upload_service/dependencies.py
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "postgresql+asyncpg://localhost/docs"
max_file_size_mb: int = 50
allowed_extensions: list[str] = [".pdf", ".png", ".jpg", ".docx"]
storage_bucket: str = "doc-intelligence-uploads"
class Config:
env_file = ".env"
@lru_cache # Singleton - loaded once per process
def get_settings() -> Settings:
return Settings()
class FileSizeChecker:
"""Reusable dependency that enforces configurable file size limits."""
def __init__(self, settings: Settings = Depends(get_settings)):
self.max_bytes = settings.max_file_size_mb * 1024 * 1024
async def __call__(self, file: UploadFile) -> UploadFile:
# Read a chunk to check size without buffering the whole file
contents = await file.read()
if len(contents) > self.max_bytes:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum is {self.max_bytes // (1024*1024)} MB",
)
# Reset position so the route can read the file again
await file.seek(0)
return file
# Usage:
@router.post("/documents/upload")
async def upload_document(
file: UploadFile = Depends(FileSizeChecker()),
current_user: dict = Depends(get_current_user),
):
...
Part 2: Lifespan Events
Why the Old Way Was Deprecated
# OLD WAY - deprecated in FastAPI 0.95, removed guidance in 0.103
# Two separate events, no way to share state between startup and shutdown
@app.on_event("startup")
async def startup():
app.state.db_pool = await create_pool(DATABASE_URL)
app.state.http_client = httpx.AsyncClient()
@app.on_event("shutdown")
async def shutdown():
await app.state.db_pool.close()
await app.state.http_client.aclose()
# If startup raised halfway through, shutdown might reference uninitialised state
The problem: if startup raises an exception after creating the pool but before creating the HTTP client, the pool leaks. The @asynccontextmanager pattern fixes this with Python's native resource management.
The Modern Lifespan Pattern
# upload_service/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from upload_service.config import get_settings
from upload_service.storage import S3StorageClient
from upload_service.classifier_client import ClassifierGRPCClient
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Everything before `yield` runs at startup.
Everything after `yield` runs at shutdown.
Python's async context manager guarantees cleanup even on error.
"""
settings = get_settings()
# 1. Initialise DB connection pool
engine = create_async_engine(
settings.database_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Validate connections before use (detects stale connections)
pool_recycle=3600, # Recycle connections older than 1 hour
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
# 2. Initialise HTTP client (connection pool for downstream REST calls)
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=2.0, read=30.0, write=10.0, pool=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
# 3. Initialise gRPC channel to Classification Service
classifier_client = ClassifierGRPCClient(host="classification-service", port=50051)
await classifier_client.connect()
# 4. Initialise S3 client
storage = S3StorageClient(bucket=settings.storage_bucket)
# 5. Make everything available to routes via app.state
app.state.db_session_factory = session_factory
app.state.http_client = http_client
app.state.classifier_client = classifier_client
app.state.storage = storage
print("Upload Service: startup complete")
yield # <-- Application runs here
# Cleanup runs in REVERSE ORDER of initialisation (like __exit__ in nested with blocks)
print("Upload Service: shutting down...")
await classifier_client.close()
await http_client.aclose()
await engine.dispose()
print("Upload Service: shutdown complete")
app = FastAPI(
title="Upload Service",
version="1.0.0",
lifespan=lifespan, # Pass the lifespan context manager
)
Accessing Lifespan State in Dependencies
# upload_service/dependencies.py
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db_session(request: Request):
"""Get a DB session from the pool initialised in lifespan."""
factory = request.app.state.db_session_factory
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_http_client(request: Request) -> httpx.AsyncClient:
"""Get the shared HTTP client initialised in lifespan."""
return request.app.state.http_client
async def get_classifier(request: Request) -> ClassifierGRPCClient:
return request.app.state.classifier_client
Loading an ML Model at Startup
A common pattern for ML serving services:
# classification_service/main.py
import torch
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Loading classification model...")
# Load model weights - this can take 5–30 seconds
# We want this to happen BEFORE the first request arrives
model = torch.load("models/classifier_v3.pt", map_location="cpu")
model.eval()
# Also load the label mapping
import json
with open("models/label_map.json") as f:
label_map = json.load(f)
# Store in app.state - shared across all workers in this process
app.state.model = model
app.state.label_map = label_map
app.state.model_version = "v3"
print(f"Model loaded. Classes: {len(label_map)}")
yield
# On shutdown, free GPU/CPU memory explicitly
del app.state.model
torch.cuda.empty_cache() if torch.cuda.is_available() else None
print("Model unloaded.")
app = FastAPI(lifespan=lifespan)
Part 3: Background Tasks vs Asyncio Tasks vs Celery
The three options have very different semantics. Choosing the wrong one is a common production mistake.
Comparison Table
| Feature | BackgroundTasks | asyncio.create_task() | Celery |
|---|---|---|---|
| When task runs | After HTTP response is sent | Immediately, concurrently | In a separate worker process |
| Task survives process restart | No | No | Yes (if using persistent broker) |
| Can be retried automatically | No | No | Yes |
| Result can be queried | No | Via asyncio.Task object | Yes (via Celery result backend) |
| Suitable for | Fire-and-forget notifications | Concurrent I/O within a request | Long-running, reliable jobs |
| Max task duration | Seconds (still in web process) | Limited by request lifetime | Minutes to hours |
| Monitoring | None built-in | None built-in | Flower, built-in events |
FastAPI BackgroundTasks
# upload_service/routes/upload.py
from fastapi import APIRouter, BackgroundTasks, Depends, UploadFile
from upload_service.dependencies import get_current_user, get_db_session
router = APIRouter()
async def send_upload_notification(user_email: str, filename: str, doc_id: str):
"""
Runs AFTER the HTTP response is sent to the client.
The client does not wait for this.
If the process crashes, this task is lost.
Use for: audit logging, non-critical notifications, analytics events.
"""
import httpx
async with httpx.AsyncClient() as client:
await client.post(
"http://notification-service/internal/notify",
json={
"recipient": user_email,
"template": "upload_complete",
"data": {"filename": filename, "doc_id": doc_id},
},
)
@router.post("/documents/upload", status_code=202)
async def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
current_user: dict = Depends(get_current_user),
db: AsyncSession = Depends(get_db_session),
):
# 1. Do the synchronous work that the client needs to wait for
doc_id = await store_document(file, current_user["id"], db)
# 2. Schedule the notification - client gets 202 immediately
# This runs in the same process but after the response is sent
background_tasks.add_task(
send_upload_notification,
user_email=current_user["email"],
filename=file.filename,
doc_id=doc_id,
)
return {"doc_id": doc_id, "status": "processing"}
When to use BackgroundTasks: Non-critical, fast (< 1 second), fire-and-forget tasks where loss is acceptable. Sending a webhook, logging an analytics event.
When NOT to use BackgroundTasks: Anything that must succeed, anything that takes more than a second, anything that needs to be retried or monitored.
asyncio.create_task() for Concurrent I/O
# Useful when you need to do several I/O operations simultaneously
# and you want to return after ALL of them complete
@router.post("/documents/batch-check")
async def check_documents_exist(doc_ids: list[str], storage = Depends(get_storage)):
# Check 20 documents in S3 simultaneously instead of sequentially
async def check_one(doc_id: str) -> tuple[str, bool]:
exists = await storage.exists(f"raw/{doc_id}")
return doc_id, exists
# All 20 checks fire concurrently
tasks = [asyncio.create_task(check_one(doc_id)) for doc_id in doc_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
doc_id: exists
for result in results
if not isinstance(result, Exception)
for doc_id, exists in [result]
}
Celery for Reliable Background Work
# celery_app.py - in a real service, this is a separate process
from celery import Celery
celery_app = Celery(
"doc_processing",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_acks_late=True, # Ack only after task completes (safer)
task_reject_on_worker_lost=True, # Re-queue if worker dies mid-task
worker_prefetch_multiplier=1, # One task at a time per worker (for CPU-bound)
)
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
name="tasks.run_ocr",
)
def run_ocr_task(self, doc_id: str, storage_key: str):
"""
Runs in a Celery worker process. Has access to retry logic.
Safe to use for CPU-heavy work (no GIL issues - separate process).
"""
try:
# Download from S3
file_bytes = storage.get(storage_key)
# Run OCR - potentially 30 seconds
text = ocr_engine.extract_text(file_bytes)
# Store results
db.execute("UPDATE documents SET ocr_text = %s WHERE id = %s", (text, doc_id))
return {"doc_id": doc_id, "text_length": len(text)}
except OCREngineError as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
# FastAPI route enqueues the Celery task
@router.post("/documents/process/{doc_id}")
async def trigger_processing(doc_id: str, current_user: dict = Depends(get_current_user)):
task = run_ocr_task.delay(doc_id=doc_id, storage_key=f"raw/{doc_id}")
return {"task_id": task.id, "status": "queued"}
Part 4: Middleware
The Middleware Execution Order Trap
This is the most common FastAPI middleware mistake:
# WRONG ORDER - results in confusing behaviour
app.add_middleware(RequestIDMiddleware) # Added first
app.add_middleware(TimingMiddleware) # Added second
app.add_middleware(CORSMiddleware, ...) # Added third
# ACTUAL EXECUTION ORDER IS REVERSED:
# Request: CORSMiddleware → TimingMiddleware → RequestIDMiddleware → route
# Response: RequestIDMiddleware → TimingMiddleware → CORSMiddleware → client
# So if TimingMiddleware needs the request_id set by RequestIDMiddleware,
# it won't have it - because RequestIDMiddleware runs AFTER it on the request path!
# CORRECT ORDER for "RequestID then Timing then CORS":
app.add_middleware(CORSMiddleware, ...) # Added first = outermost = runs last on request
app.add_middleware(TimingMiddleware) # Added second
app.add_middleware(RequestIDMiddleware) # Added last = innermost = runs first on request
The rule: the last middleware added is the first one executed. Add middleware in reverse order of desired execution.
Request ID Middleware
# upload_service/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class RequestIDMiddleware(BaseHTTPMiddleware):
"""
Assigns a unique ID to every request.
Propagates incoming X-Request-ID if provided (for distributed tracing).
Adds X-Request-ID to every response.
"""
async def dispatch(self, request: Request, call_next) -> Response:
# Use incoming ID if present (from API gateway or upstream service)
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
# Store in request state for access in routes and other middleware
request.state.request_id = request_id
response = await call_next(request)
# Echo the request ID back in the response
response.headers["X-Request-ID"] = request_id
return response
Timing Middleware
# upload_service/middleware/timing.py
import time
import logging
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
logger = logging.getLogger("upload_service.timing")
class TimingMiddleware(BaseHTTPMiddleware):
"""
Measures request processing time.
Logs slow requests.
Adds X-Process-Time header to responses.
"""
SLOW_REQUEST_THRESHOLD_MS = 500
async def dispatch(self, request: Request, call_next) -> Response:
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
response.headers["X-Process-Time"] = f"{duration_ms:.2f}ms"
log_data = {
"method": request.method,
"path": request.url.path,
"status": response.status_code,
"duration_ms": round(duration_ms, 2),
"request_id": getattr(request.state, "request_id", "unknown"),
}
if duration_ms > self.SLOW_REQUEST_THRESHOLD_MS:
logger.warning("Slow request detected", extra=log_data)
else:
logger.info("Request handled", extra=log_data)
return response
Correlation ID Propagation (Distributed Tracing Context)
# upload_service/middleware/correlation.py
# When upload-service calls classification-service,
# the correlation ID must be forwarded so logs can be joined
import contextvars
from starlette.middleware.base import BaseHTTPMiddleware
# Module-level context variable - safe for concurrent asyncio use
correlation_id_ctx: contextvars.ContextVar[str] = contextvars.ContextVar(
"correlation_id", default=""
)
class CorrelationIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
correlation_id = (
request.headers.get("X-Correlation-ID")
or request.headers.get("X-Request-ID")
or str(uuid.uuid4())
)
# Set in contextvars - available everywhere in this request's coroutine tree
token = correlation_id_ctx.set(correlation_id)
try:
response = await call_next(request)
response.headers["X-Correlation-ID"] = correlation_id
return response
finally:
correlation_id_ctx.reset(token)
# Anywhere in the codebase, during a request:
def get_correlation_id() -> str:
return correlation_id_ctx.get()
# When making an outbound HTTP call to another service:
async def call_classification_service(text: str, http_client: httpx.AsyncClient):
response = await http_client.post(
"http://classification-service/classify",
json={"text": text},
headers={
"X-Correlation-ID": get_correlation_id(), # Forward context
},
)
return response.json()
Writing Raw ASGI Middleware (Maximum Performance)
BaseHTTPMiddleware has overhead because it buffers the request body. For maximum performance, write raw ASGI middleware:
# upload_service/middleware/raw_auth_check.py
# Example: reject requests without Authorization header BEFORE FastAPI even sees them
class EarlyAuthCheckMiddleware:
"""
Raw ASGI middleware - runs before FastAPI's routing.
Rejects unauthenticated requests at the lowest possible layer.
No request body buffering.
"""
EXCLUDED_PATHS = {"/health", "/readiness", "/openapi.json", "/docs", "/redoc"}
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "")
if path in self.EXCLUDED_PATHS:
await self.app(scope, receive, send)
return
# Check for Authorization header in scope
headers = dict(scope.get("headers", []))
if b"authorization" not in headers:
# Reject at the ASGI layer - fastest possible rejection
await send({
"type": "http.response.start",
"status": 401,
"headers": [
[b"content-type", b"application/json"],
[b"www-authenticate", b'Bearer realm="upload-service"'],
],
})
await send({
"type": "http.response.body",
"body": b'{"detail":"Authorization header required"}',
})
return
await self.app(scope, receive, send)
# Add to app AFTER FastAPI middlewares (so it runs BEFORE them)
app.add_middleware(EarlyAuthCheckMiddleware)
Assembling the Middleware Stack
# upload_service/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from .middleware.correlation import CorrelationIDMiddleware
from .middleware.timing import TimingMiddleware
from .middleware.request_id import RequestIDMiddleware
from .middleware.raw_auth_check import EarlyAuthCheckMiddleware
app = FastAPI(title="Upload Service", lifespan=lifespan)
# Order of add_middleware calls = reverse of execution order
# Execution order for a request: RequestID → Timing → Correlation → GZip → CORS → FastAPI
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"],
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=1000) # Compress responses > 1 KB
app.add_middleware(CorrelationIDMiddleware)
app.add_middleware(TimingMiddleware)
app.add_middleware(RequestIDMiddleware)
# Raw ASGI middleware - wrap the entire app
# This runs BEFORE all other middleware
app.middleware_stack = EarlyAuthCheckMiddleware(app.middleware_stack)
Part 5: Custom Exception Handlers
The Problem With Unhandled Exceptions
# Without exception handlers, FastAPI returns:
# {
# "detail": "Internal Server Error"
# }
# for any unhandled exception.
# This tells the client nothing useful and leaks no internal info - which is good
# for security but bad for debugging. The solution: map domain exceptions to
# structured HTTP responses.
Domain Exception Hierarchy
# upload_service/exceptions.py
class UploadServiceError(Exception):
"""Base exception for all Upload Service errors."""
status_code: int = 500
error_code: str = "INTERNAL_ERROR"
class DocumentNotFoundError(UploadServiceError):
status_code = 404
error_code = "DOCUMENT_NOT_FOUND"
def __init__(self, doc_id: str):
self.doc_id = doc_id
super().__init__(f"Document {doc_id} not found")
class FileTooLargeError(UploadServiceError):
status_code = 413
error_code = "FILE_TOO_LARGE"
def __init__(self, size_bytes: int, max_bytes: int):
self.size_bytes = size_bytes
self.max_bytes = max_bytes
super().__init__(
f"File size {size_bytes} exceeds maximum {max_bytes}"
)
class UnsupportedFileTypeError(UploadServiceError):
status_code = 422
error_code = "UNSUPPORTED_FILE_TYPE"
def __init__(self, filename: str, allowed: list[str]):
self.filename = filename
self.allowed = allowed
super().__init__(f"File type not supported. Allowed: {allowed}")
class ClassificationServiceUnavailableError(UploadServiceError):
status_code = 503
error_code = "CLASSIFICATION_SERVICE_UNAVAILABLE"
Structured Error Response Model
# upload_service/schemas/errors.py
from pydantic import BaseModel
from typing import Any
class ErrorDetail(BaseModel):
"""Structured error response - consistent across all endpoints."""
error_code: str # Machine-readable code for client logic
message: str # Human-readable message
request_id: str # For support ticket correlation
details: dict[str, Any] | None = None # Extra context (field errors, etc.)
class ValidationErrorDetail(BaseModel):
"""Pydantic validation error response."""
error_code: str = "VALIDATION_ERROR"
message: str = "Request validation failed"
request_id: str
errors: list[dict] # Field-level errors from Pydantic
Registering Exception Handlers
# upload_service/exception_handlers.py
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
import logging
from .exceptions import UploadServiceError
from .schemas.errors import ErrorDetail, ValidationErrorDetail
logger = logging.getLogger("upload_service.errors")
def get_request_id(request: Request) -> str:
return getattr(request.state, "request_id", "unknown")
async def upload_service_exception_handler(
request: Request, exc: UploadServiceError
) -> JSONResponse:
"""Handle all domain-specific exceptions."""
request_id = get_request_id(request)
logger.error(
f"Domain error: {exc.error_code}",
extra={
"error_code": exc.error_code,
"request_id": request_id,
"path": request.url.path,
},
exc_info=exc,
)
return JSONResponse(
status_code=exc.status_code,
content=ErrorDetail(
error_code=exc.error_code,
message=str(exc),
request_id=request_id,
).model_dump(),
)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
"""Handle Pydantic validation errors with field-level details."""
return JSONResponse(
status_code=422,
content=ValidationErrorDetail(
request_id=get_request_id(request),
errors=exc.errors(),
).model_dump(),
)
async def generic_exception_handler(
request: Request, exc: Exception
) -> JSONResponse:
"""Catch-all handler - never expose internal details to clients."""
request_id = get_request_id(request)
logger.critical(
"Unhandled exception",
extra={"request_id": request_id, "path": request.url.path},
exc_info=exc,
)
return JSONResponse(
status_code=500,
content=ErrorDetail(
error_code="INTERNAL_ERROR",
message="An unexpected error occurred. Please try again.",
request_id=request_id,
).model_dump(),
)
# Register in main.py:
def register_exception_handlers(app: FastAPI) -> None:
app.add_exception_handler(UploadServiceError, upload_service_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, generic_exception_handler)
Part 6: OpenAPI Customisation
Customising the OpenAPI Schema
# upload_service/main.py
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
schema = get_openapi(
title="Upload Service API",
version="1.2.0",
description="""
## Upload Service
Handles document ingestion for the Document Intelligence Platform.
### Authentication
All endpoints (except `/health`) require a Bearer JWT token.
### Rate Limits
- Upload endpoints: 100 requests/minute per user
- Query endpoints: 1000 requests/minute per user
### Supported File Types
`.pdf`, `.png`, `.jpg`, `.docx`
""",
contact={
"name": "Platform Engineering Team",
},
license_info={
"name": "Proprietary",
},
routes=app.routes,
)
# Add security scheme
schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "JWT token from the Auth Service",
}
}
# Apply security globally to all operations
for path_data in schema["paths"].values():
for operation in path_data.values():
if isinstance(operation, dict):
# Skip health check from security requirement
if operation.get("operationId") not in {"health_check", "readiness_check"}:
operation.setdefault("security", [{"BearerAuth": []}])
# Add common response schemas to all operations
for path_data in schema["paths"].values():
for operation in path_data.values():
if isinstance(operation, dict):
operation.setdefault("responses", {}).update({
"401": {"description": "Unauthorized"},
"429": {"description": "Too Many Requests"},
"500": {"description": "Internal Server Error"},
})
app.openapi_schema = schema
return schema
app.openapi = custom_openapi
Hiding Internal Routes from Public OpenAPI
# Some routes are for internal service-to-service calls only
# and should not appear in the public API docs
# Method 1: include_in_schema=False on individual routes
@router.post("/internal/trigger-reprocess", include_in_schema=False)
async def trigger_reprocess(doc_id: str):
"""Called by Processing Service only. Not public."""
...
# Method 2: Separate routers with different OpenAPI settings
internal_router = APIRouter(prefix="/internal", include_in_schema=False)
@internal_router.post("/reset-document-state")
async def reset_document_state(doc_id: str):
...
# Method 3: Filter by tag when generating the spec
# Only include routes tagged "public" in the client-facing spec
def public_openapi():
schema = get_openapi(
routes=[r for r in app.routes if "public" in getattr(r, "tags", [])],
title="Upload Service - Public API",
version="1.0.0",
)
return schema
Custom Response Schemas and Examples
# upload_service/schemas/documents.py
from pydantic import BaseModel, Field
from typing import Literal
from datetime import datetime
class DocumentUploadResponse(BaseModel):
doc_id: str = Field(..., example="doc_01HN3K8V2NKDQ7HTXJFR9CPWK")
status: Literal["queued", "processing", "complete", "failed"]
filename: str = Field(..., example="invoice_2024_01.pdf")
size_bytes: int = Field(..., example=524288)
storage_key: str = Field(..., example="raw/user-123/invoice_2024_01.pdf")
created_at: datetime
model_config = {
"json_schema_extra": {
"example": {
"doc_id": "doc_01HN3K8V2NKDQ7HTXJFR9CPWK",
"status": "queued",
"filename": "invoice_2024_01.pdf",
"size_bytes": 524288,
"storage_key": "raw/user-123/invoice_2024_01.pdf",
"created_at": "2024-01-15T10:30:00Z",
}
}
}
# Use in route - FastAPI reflects this in the OpenAPI schema
@router.post(
"/documents/upload",
response_model=DocumentUploadResponse,
status_code=202,
summary="Upload a document for processing",
description="""
Upload a document file (PDF, PNG, JPG, DOCX) for processing.
The document is stored and queued for OCR and classification.
Processing is asynchronous - poll `/documents/{doc_id}` for status.
**Rate limit**: 100 uploads/minute per user.
""",
responses={
202: {"description": "Document accepted and queued for processing"},
413: {"description": "File exceeds size limit (50 MB)"},
422: {"description": "Unsupported file type or malformed request"},
},
)
async def upload_document(
file: UploadFile,
background_tasks: BackgroundTasks,
current_user: dict = Depends(get_current_user),
):
...
Generating a Typed Python Client SDK
# Install openapi-generator-cli (requires Java, or use Docker)
npm install @openapitools/openapi-generator-cli -g
# Export the OpenAPI schema from the running service
curl http://localhost:8001/openapi.json > upload-service-openapi.json
# Generate a Python client
openapi-generator-cli generate \
-i upload-service-openapi.json \
-g python \
-o clients/upload-service-client \
--additional-properties=packageName=upload_service_client,projectName=upload-service-client
# The generated client is in clients/upload-service-client/
# Other services (or frontend) can now use typed calls:
# Generated client usage in another service
from upload_service_client import ApiClient, Configuration, DocumentsApi
config = Configuration(host="http://upload-service:8001")
config.access_token = "eyJhbGciOiJSUzI1NiIs..."
with ApiClient(config) as api_client:
documents_api = DocumentsApi(api_client)
# Fully typed - IDE autocomplete works
doc_status = documents_api.get_document(doc_id="doc_01HN3K8V2NKDQ7HTXJFR9CPWK")
print(f"Status: {doc_status.status}") # type: DocumentStatus
Part 7: Production Patterns
Uvicorn + Gunicorn Configuration
# gunicorn.conf.py - for production deployment
import multiprocessing
# Worker processes = (2 × CPU cores) + 1
# For async workers (uvicorn), this is less critical - one process with
# many coroutines is often fine, but multiple processes handle GIL-bound work
workers = multiprocessing.cpu_count() * 2 + 1
# Use uvicorn's worker class (handles ASGI apps)
worker_class = "uvicorn.workers.UvicornWorker"
# Timeouts
timeout = 120 # Kill worker if request takes > 2 minutes
graceful_timeout = 30 # Allow 30 seconds for in-flight requests on shutdown
keepalive = 5 # HTTP keep-alive timeout
# Connections
bind = "0.0.0.0:8001"
backlog = 2048
# Logging
accesslog = "-" # stdout
errorlog = "-" # stdout
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(L)s'
# Restart workers after 1000 requests to prevent memory leaks
max_requests = 1000
max_requests_jitter = 100 # Add jitter to prevent thundering herd restarts
# Start the server
gunicorn upload_service.main:app -c gunicorn.conf.py
# Or with uvicorn directly for development
uvicorn upload_service.main:app --host 0.0.0.0 --port 8001 --reload
Health Check Endpoints
Kubernetes distinguishes between liveness and readiness. Your service must too.
# upload_service/routes/health.py
from fastapi import APIRouter, Request, status
from fastapi.responses import JSONResponse
import time
health_router = APIRouter()
@health_router.get(
"/health",
summary="Liveness probe",
description="Returns 200 if the process is alive. Does NOT check dependencies.",
include_in_schema=False,
)
async def liveness():
"""
Kubernetes liveness probe.
If this returns non-200, Kubernetes restarts the container.
Should ONLY check that the process is running - not that downstream services are up.
A failing liveness probe causes restarts, which can cascade.
"""
return {"status": "alive", "timestamp": time.time()}
@health_router.get(
"/readiness",
summary="Readiness probe",
description="Returns 200 only if the service can handle traffic. Checks all dependencies.",
include_in_schema=False,
)
async def readiness(request: Request):
"""
Kubernetes readiness probe.
If this returns non-200, Kubernetes stops sending traffic but does NOT restart.
Check all critical dependencies here.
"""
checks = {}
all_healthy = True
# Check 1: Database
try:
factory = request.app.state.db_session_factory
async with factory() as db:
await db.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {str(e)[:100]}"
all_healthy = False
# Check 2: Redis (if used)
try:
redis = request.app.state.redis_client
await redis.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {str(e)[:100]}"
all_healthy = False
# Check 3: gRPC to Classification Service
try:
classifier = request.app.state.classifier_client
await classifier.health_check()
checks["classifier_service"] = "ok"
except Exception as e:
checks["classifier_service"] = f"error: {str(e)[:100]}"
# Classifier being down makes us not ready for classification requests
# but we might still want to accept uploads
# Decision: mark not ready - simpler, prevents partial failures
all_healthy = False
# Check 4: Storage (S3)
try:
storage = request.app.state.storage
await storage.health_check() # List a bucket or check credentials
checks["storage"] = "ok"
except Exception as e:
checks["storage"] = f"error: {str(e)[:100]}"
all_healthy = False
response_status = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(
status_code=response_status,
content={
"status": "ready" if all_healthy else "not_ready",
"checks": checks,
"timestamp": time.time(),
},
)
Graceful Shutdown
# upload_service/main.py - Graceful shutdown in lifespan
import signal
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
# ... startup ...
yield
# ... cleanup ...
# The yield ensures cleanup runs even on SIGTERM
# Gunicorn sends SIGTERM, waits graceful_timeout seconds, then SIGKILL
# During this window, in-flight requests can complete
# Dockerfile - use exec form so signals reach the process
# CMD ["gunicorn", "upload_service.main:app", "-c", "gunicorn.conf.py"]
# NOT: CMD ["sh", "-c", "gunicorn ..."] ← sh catches SIGTERM, not gunicorn
Interview Patterns
Pattern 1: FastAPI Dependency Injection
Q: How does FastAPI's dependency injection differ from a standard function call?
A: FastAPI's Depends() builds a dependency graph (DAG) that it resolves once per request, sharing instances between multiple dependents. It supports yield dependencies for resource lifecycle management - the code before yield runs before the route, code after yield runs after (even on exception). Dependencies can be overridden in tests without monkey-patching. A plain function call would require manual passing of dependencies, making testing and swapping implementations much harder.
Pattern 2: Middleware Ordering
Q: What is the middleware execution order in FastAPI, and where does it bite teams?
A: The last middleware added via app.add_middleware() executes first when a request arrives (LIFO order). Teams get bitten when they add RequestIDMiddleware before TimingMiddleware, expecting the timing middleware to have access to the request ID - but it runs first and the ID isn't set yet. The fix is either to add middleware in reverse order, or to access request ID via request.state inside call_next (after the inner middleware has run).
Pattern 3: Lifespan vs. Startup Events
Q: Why was the @app.on_event("startup") pattern deprecated?
A: Two reasons. First, startup and shutdown are separate functions with no shared scope, so you must use app.state or module-level globals to share resources between them, which is error-prone. Second, if startup raises an exception after partially completing, cleanup code in shutdown may reference uninitialised state. The @asynccontextmanager lifespan pattern fixes both - startup and shutdown share the same function scope, and Python's async context manager protocol guarantees cleanup runs via try/finally semantics.
Pattern 4: BackgroundTasks Limitations
Q: A colleague adds a BackgroundTask to send an email after an upload. It works in development but emails are occasionally lost in production. Why?
A: BackgroundTasks runs in the same process as the web server. If the process is restarted (deployment, OOM kill, pod eviction) while a background task is in progress, the task is silently lost. For reliable fire-and-forget work, the task must be durably persisted - either to a message queue (Kafka, RabbitMQ) or a task queue with a persistent broker (Celery with Redis/RabbitMQ). The upload route should publish an event to a message queue, and a separate consumer process should send the email.
Pattern 5: Exception Handler Registration Order
Q: You have both UploadServiceError and Exception handlers registered. A DocumentNotFoundError (which inherits from UploadServiceError) is raised. Which handler fires?
A: The UploadServiceError handler fires because FastAPI matches the most specific registered exception type first. However, if you register Exception before UploadServiceError, FastAPI matches Exception first (since DocumentNotFoundError is also an Exception). Registration order matters for exception handlers - always register specific handlers before generic ones, or use isinstance checks in a single catch-all handler.
